import time
from concurrent.futures import wait, ALL_COMPLETED
from multiprocessing.pool import ThreadPool

from common.ObjectPool import ObjectPool
import asyncio
import threading
import requests

from common.ThreadSafeDict import ThreadSafeDict
from common.logger import logger
from common.retryUtils import async_retry, retry
from common.timeUtil import calc_time, calc_async_time

data = dict()
data_lock = ThreadSafeDict()


async def create_obj():
    return dict()


pool = ObjectPool(create_obj, 10)


@async_retry(3)
async def error_test_r():
    logger.info("async error_test")
    await asyncio.sleep(1)
    logger.info("error_test ok")
    # raise RuntimeError("async error_test")


@retry(3)
def error_test():
    raise RuntimeError("error_test")


# @retry(3)
def request_test():
    """遇到IO事件阻塞，非并发操作"""
    logger.info(f"{threading.current_thread().name} request_test")
    res = requests.get("https://www.baidu.com")
    logger.info(res)
    return res


async def arequest():
    return requests.get("https://www.baidu.com")


async def arequest_test():
    """遇到IO事件阻塞，非并发操作"""
    logger.info(f"{threading.current_thread().name} request_test")
    res = await arequest()
    logger.info(res)
    return res


# @async_retry(3)
async def async_request_test():
    """多线程执行阻塞函数"""
    logger.info(f"{threading.current_thread().name} async request_test")
    loop = asyncio.get_event_loop()
    res = await loop.run_in_executor(None, lambda: requests.get("https://www.baidu.com"))
    # logger.info(res)
    return res


def set_data(k):
    data[k] = k


async def aset_data(k):
    data[k] = k


# @async_retry(3)
async def run_in_executor_test():
    """多线程执行阻塞函数"""
    logger.info(f"{threading.current_thread().name} async run_in_executor_test")
    loop = asyncio.get_event_loop()
    # res = [data_lock.set(str(i), i) for i in range(10000)]  # 0.06-0.078
    # res = [set_data(str(i)) for i in range(10000)]  # 0.03
    # res = [aset_data(str(i)) for i in range(10000)]  # 1.06 ~ 1.29
    res = [loop.run_in_executor(ThreadPool, lambda x: set_data(x), str(i)) for i in range(10000)]  # 4.3
    # await asyncio.sleep(0.1)
    # res = [loop.run_in_executor(ThreadPool, lambda x: data_lock.set(x, x), str(i)) for i in range(10000)]  # 4.4
    await asyncio.gather(*res)
    # logger.info(f"data: {data.__len__()} data_lock: {data_lock.size()}")
    return res


def executor_test():
    """执行阻塞函数"""
    logger.info(f"{threading.current_thread().name} executor_test")
    res = [set_data(i) for i in range(10000)]  # 4.3
    return res


async def pool_test():
    for i in range(10):
        obj = await pool.acquire()
        logger.info(f"pool {pool.size()} obj {id(obj)}")
        pool.release(obj)


async def for_test(flag):
    if flag:
        yield 1
    data = [list(range(int(i / 2), 10)) for i in range(1, 10)]
    for i in data:
        yield i


async def for_from_test(flag):
    async for i in for_test(flag):
        print(i)
        if i == flag:
            break


def for_test():
    for i in range(10):
        time.sleep(1)
        yield i


@calc_async_time
async def amain():
    task = []
    # [request_test() for _ in range(100)] # 1.2  100*: 14s
    # task = [error_test_r() for _ in range(100)]  # 1s
    # task = [arequest_test() for _ in range(100)]  # 10*: 2.8s 100*: 13s
    task = [async_request_test() for _ in range(100)]  # 0.2 ~ 0.4s   100*: 1~1.8s

    # task = [run_in_executor_test() for _ in range(100)]  # ~ 4.5s 100*: 58s
    # task = [asyncio.create_task(run_in_executor_test()) for _ in range(100)]  # ~ 4.5s  100*:143s
    await asyncio.gather(*task)  # 并发执行携程


async def for_main():
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(ThreadPool, lambda x: list(for_test()), i) for i in range(2)]
    data = await asyncio.gather(*tasks)
    logger.info(data)


@calc_time
def tmain():
    tasks = []
    with ThreadPool as t:
        for _ in range(10000):
            # tasks.append(t.submit(request_test()))  # 1.28s
            # tasks.append(t.submit(lambda :asyncio.run(arequest_test()))) # 1.28s - 1.4
            tasks.append(t.submit(lambda: asyncio.run(async_request_test())))  # 1s ~ 1.4s
        wait(tasks, return_when=ALL_COMPLETED)


if __name__ == "__main__":
    # error_test()
    asyncio.run(for_main())
    # tmain()
